/*************************************************************************
	> File Name: pmw-server.c
	> Author: wtao, Fierralin
	> Mail: tao.wang0221@gmail.com, fierralin@hotmail.com
 ************************************************************************/

#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <assert.h>
#include <math.h>
#include <time.h>
//#include <sys/time.h>
//#include <time.h>
#include <inttypes.h>


#include "pmw-server.h"
#include "event2/pmw.h"

void pmw_init()
{
	pthread_mutex_init(&que_lock, NULL);
	pthread_mutex_init(&vis_fd_lock, NULL);

	memset(pipe_pair, 0 ,sizeof(pipe_pair));

	int i;
	for (i = 0; i < MAXFD; i++) {
		ep_link[i].head = NULL;
		nf_link[i].head = NULL;
		nf_link[i].tail = NULL;
		nf_link[i].scur = NULL;
	}

	// init hash table
	ht = hashmapCreate(0);
	pthread_mutex_init(&ht->hashlock, NULL);

	printf("pwm init......\n");
	// Tao: create a thread for connection from NFV
	serv_fd = socket(AF_INET, SOCK_STREAM, 0);
	if (serv_fd < 0) {
		fprintf(stderr, "creating serv_fd error!\n");
		exit(-1);
	}
	// set serv_fd to non-blocking
	if (fcntl(serv_fd, F_SETFL, fcntl(serv_fd, F_GETFD, 0)|O_NONBLOCK) == -1) {
		fprintf(stderr, "setting serv_fd non-blocking error!\n");
		exit(-1);
	}
	// 
	bzero(&serv_addr_in, sizeof(serv_addr_in));
	serv_addr_in.sin_family = AF_INET;
	serv_addr_in.sin_addr.s_addr = htonl(INADDR_ANY);
	serv_addr_in.sin_port = htons(BIND_PORT);
	//bind
	if (bind(serv_fd, (struct sockaddr *)&serv_addr_in, sizeof(struct sockaddr)) == -1) {
		fprintf(stderr, "binding serv_fd error\n");
		exit(-1);
	}
	// listen
	if (listen(serv_fd, MAXNEVENTS) == -1) {
		fprintf(stderr, "listening serv_fd error\n");
		exit(-1);
	}
	// epoll fd
	serv_epfd = epoll_create(MAXNEVENTS);
	printf("pmw_init(): serv_epfd --> %d\n", serv_epfd);
	if (serv_epfd < 0) {
		fprintf(stderr, "creating epoll fd error\n");
		exit(-1);
	}
	// nfv_process_threads
	for (i = 0; i < NUM_NFV_THREADS; i++) {
		//nfv_process_ctx[i].pid = i;
		if ((nfv_process_ctx[i].epfd = epoll_create(MAXNEVENTS)) == -1) {
			perror("epoll_create() failed for nfv_process_ctx\n");
			exit(-1);
		}
		pthread_create(&(nfv_process_ctx[i].pid), NULL,
					   (void *)&nfv_process_thread, (void *)(&nfv_process_ctx[i]));
	}
	// create the thread
	//printf("we are now creating the nfv_thread() thread\n");
	if (pthread_create(&thread_id, NULL, (void *)&nfv_thread, NULL) < 0) {
		fprintf(stderr, "creating the thread error\n");
		exit(-1);
	}
}

void nfv_thread()
{
	struct epoll_event ev;
	struct epoll_event epevents[MAXNEVENTS];
	ev.events = EPOLLIN;
	ev.data.fd = serv_fd;
	if (epoll_ctl(serv_epfd, EPOLL_CTL_ADD, serv_fd, &ev) < 0) {
		fprintf(stderr, "epoll set serv_fd EPOLLIN error!\n");
		exit(-1);
	}
	// the for loop
	for (;;) {
		int res;
		// TODO: we may vary this timeouts
		res = epoll_wait(serv_epfd, epevents, MAXNEVENTS, 1);

		int i;
		int connfd, fd;
		int nsend, nrecv;
		struct sockaddr_in cli_addr_in;
		socklen_t clilen = sizeof(struct sockaddr_in);	
		for (i = 0; i < res; i++) {
			fd = epevents[i].data.fd;

			// process epollin event
			if (fd == serv_fd) {
				while ((connfd = accept(serv_fd, (struct sockaddr *)&cli_addr_in,
										&clilen)) > 0) {
					if (fcntl(connfd, F_SETFL, fcntl(connfd, F_GETFD, 0)
							  | O_NONBLOCK) == -1) {
						fprintf(stderr, "setting connfd non-blocking error!\n");
						exit(-1);
					}
					ev.events = EPOLLIN | EPOLLOUT;
					ev.data.fd = connfd;
					pthread_mutex_lock(&vis_fd_lock);
					initial_data(connfd, CONN_NFV, 0, 0, 0);
					pthread_mutex_unlock(&vis_fd_lock);

					int tid = connfd % NUM_NFV_THREADS;
					struct nfv_process_contex *pth_ctx = nfv_process_ctx + tid;
					epoll_ctl(pth_ctx->epfd, EPOLL_CTL_ADD, connfd, &ev);
					struct linked_fd *tmp;
					tmp = (struct linked_fd *)malloc(sizeof(struct linked_fd));
					if (tmp != NULL) {
						tmp->tfd = connfd;
						tmp->next = NULL;
					}
					else {
						printf("nfv_thread(): cannot malloc struct linked_fd\n");
					}
					tmp->next = ep_link[pth_ctx->epfd].head;
					ep_link[pth_ctx->epfd].head = tmp;
					printf("nfv_thread(): accpeted nfv fd --> %d\n", connfd);
				}
			} // finished if (fd == serv_fd)
			else { // when fd is not serv_fd
				printf("### nfv_thread, why you come here!!!!!!!!!\n");
			} // end else fd != serv_fd
		} // finish for each event
	} // finish for (;;)
}

void nfv_process_thread(void *arg) {
	struct nfv_process_contex *pth = (struct nfv_process_contex *)arg;
	struct epoll_event tmp_event;
	struct epoll_event events[MAXNEVENTS];
	int sfd, epfd;
	int nevents, tmpi;

	printf("pwm nfv_process_thread: %u\n", pth->pid);
	epfd = pth->epfd;

	while (1) {
		nevents = epoll_wait(epfd, events, MAXNEVENTS, 1);
		for (tmpi = 0; tmpi < nevents; tmpi++) {
			sfd = events[tmpi].data.fd;
			if (events[tmpi].events & EPOLLIN) {
				if (nf_link[sfd].state == CONN_NFV) {
					// printf("[NFV-related Event] EPOLLIN\n");
					int done_flag = 0;
					while (!done_flag) {
						int buf_len = rbuf_getsparelen(&(nf_link[sfd].rbuf));
						if (buf_len > 0) {
							//char *tmp_buf = (char *)malloc(sizeof(char) * buf_len);
							char tmp_buf[MAXLEN + 240];
							int nrecv = read(sfd, tmp_buf, buf_len);
							// printf("nrecv: %d\n", nrecv);
							if (nrecv < 0) {
								if (errno != EAGAIN && errno != EWOULDBLOCK) {
									// TODO: we handle error here
									printf("error with nrecv < 0\n");
								}
								done_flag = 1;
							}

							else if (nrecv == 0) {
								// means that the remote client has no data to send
								printf("nrecv is zero, the remote connection is down\n");
								close(sfd);
								epfd_delete(epfd, sfd);
								done_flag = 1;
							}

							else {
								rbuf_write(&(nf_link[sfd].rbuf), tmp_buf, nrecv);
							}
						}
						else {
							done_flag = 1;
						}
					}

				} // here only process CONN_NFV coming in data
			} // finish if (events[tmpi].events & EPOLLIN)
			else if (events[tmpi].events & EPOLLOUT) {
				if (nf_link[sfd].state == CONN_NFV) {
					int done_flag = 0;
					while (!done_flag) {
						int buf_len = rbuf_getlength(&(nf_link[sfd].sbuf));

						if (buf_len > 0) {
							//char *tmp_buf = (char *)malloc(sizeof(char) * buf_len);
							char tmp_buf[MAXLEN];
							//printf("EPOLL_OUT tmp_buf buflen[%d]\n", buf_len);
							rbuf_get(&(nf_link[sfd].sbuf), tmp_buf, buf_len);
							int nsend = write(sfd, tmp_buf, buf_len);
							// Tao: we can use rbuf_read or rbuf_release here
							rbuf_read(&(nf_link[sfd].sbuf), tmp_buf, nsend);
							// printf("pm_epoll_wait(): %d\n", nsend);
							if (nsend < 0) {
								if (errno != EAGAIN && errno != EWOULDBLOCK) {
									// TODO: we handle error here
									printf("[CONN_NFV]epoll out error");
									done_flag = 1;
								}
								else {
									// we need to wait for the next EPOLLOUT
									done_flag = 1;
								}
							}
							else if (nsend == 0) {
								// we have nothing to send
								printf("nsend is 0, the remote connection is down\n");
								close(sfd);
								epfd_delete(epfd, sfd);
								done_flag = 1;
							}
							//free(tmp_buf);
						}
						else {
							done_flag = 1;
						}
					}
				} // here only process CONN_NFV coming in data too
			} // finish else if (events[tmpi].events & EPOLLOUT)

		} // finish for nevents
		// check the input and output buffer here
		struct linked_fd *p = ep_link[epfd].head;
		while (p) {
			// check the input
			int fd = p->tfd;
			int buf_len = rbuf_getlength(&(nf_link[fd].rbuf));
			int ana_len = 0;

			if (buf_len > 0) {
				if (buf_len >= INHLEN) {
					char tmp_buf[MAXLEN + 240];
					rbuf_get(&(nf_link[fd].rbuf), tmp_buf, buf_len);
					ana_len = decompose_packet(fd, tmp_buf, buf_len); // process data
					rbuf_release(&nf_link[fd].rbuf, ana_len); // release rbuf
				}
			}

			// compose the ouput buf (i.e. sbuf)
			buf_len = rbuf_getsparelen(&(nf_link[fd].sbuf));
			if (buf_len > 0)
				compose_packet(fd);
			p = p->next;
		} // finish while (p)
	} // finish while (1)
}

void pmw_fini() {
	printf("pwm finish......\n");
}

int decompose_packet(int fd, char *data, uint32_t len)
{
	int op_sum = 0; // we track the sum # of ops, for debug use
	uint32_t parsed_size = 0;
	char *cur = data; // current process position

	//printf("decompose_packet(): type --> %d\n", type);

	while (parsed_size < len) {
		INNER_H *inh_p = (INNER_H *)cur;
		uint8_t itype = inh_p->type;
		if (CONNECT == itype) {
			struct in_process c_tmp;
			c_tmp.ip = inh_p->src_ip;
			c_tmp.port = ntohs(inh_p->port);
			c_tmp.fd = find_fake_fd();
			//printf("decompose_packet(): generated fake fd is %d\n", c_tmp.fd);
			// we may link this lk_fd to the global main socket fd
			// c_tmp.lk_fd = fd;
			c_tmp.lk_fd = mc_main_fd;

			pthread_mutex_lock(&que_lock);
			conn_que[que_st] = c_tmp;
			que_st = (que_st + 1) % MAXFD;
			que_len += 1;

			pthread_mutex_unlock(&que_lock);
			// we may need some trick here to setup before

			pthread_mutex_lock(&ht->hashlock);
			hashmapInsert(ht, c_tmp.fd, (c_tmp.ip << 16) + c_tmp.port);
			pthread_mutex_unlock(&ht->hashlock);

			// initialize data
			initial_data(c_tmp.fd, CONN_DATA, c_tmp.ip, c_tmp.port, fd);
			op_sum++;
			cur = cur + INHLEN;
			parsed_size = parsed_size + INHLEN;
		}
		else if (S_REQUEST == itype) { // process single packet
			uint8_t src_ip = inh_p->src_ip;
			uint16_t port = ntohs(inh_p->port);
			//uint32_t ack = ntohs(inh_p->ack);
			cur = cur + INHLEN;
			uint16_t reqlen = ntohs(inh_p->len_msg);
			assert(reqlen < (MAXLEN + 240));
			int tfd = hashmapGet(ht, (src_ip << 16) + port);
			//printf("decompose_packet() write to --> fd rbuf %d with length %d\n", tfd, reqlen);
			assert(tfd != -1);
			rbuf_write(&(nf_link[tfd].rbuf), cur, reqlen);
			if (pthread_mutex_trylock(&ep_link[nf_link[tfd].epfd].lock) == 0) {
				pthread_cond_signal(&ep_link[nf_link[tfd].epfd].sig);
				pthread_mutex_unlock(&ep_link[nf_link[tfd].epfd].lock);
			}
			//printf("get len: %d\n", rbuf_getlength(&(nf_link[tfd].rbuf)));
			cur = cur + reqlen;
			parsed_size = parsed_size + INHLEN + reqlen;
			//free(tmp_buf);
			//nf_link[tfd].ack = ack;
			op_sum++;
		}
		else {
			printf("decompose_packet(): what the fuck? in inner decompose type[%d]\n", itype);
			break;
		}

		if (len - parsed_size < INHLEN)
			break;
	}

	return parsed_size;
}

void compose_packet(int fd)
{
	// compose the send buffer of fd
	pthread_spin_lock(&(nf_link[fd].sbuf.lock));
	int buf_len = nf_link[fd].sbuf.capacity - nf_link[fd].sbuf.length;
	//printf("fd: %d, capacity: %d, sbuf: %d, buf_len: %d\n",
	//		fd, nf_link[fd].sbuf.capacity, nf_link[fd].sbuf.length,buf_len);
	//char *tmp_buf = (char *)malloc(buf_len * sizeof(char));
	char tmp_buf[MAXLEN];

	//OUTTER_H *outh_p = (OUTTER_H *)tmp_buf;
	int num_msg = 0;
	int len_msg = 0;
	int nwritten = 0;
	int i;

	struct linked_fd *p = nf_link[fd].scur;	// point to the first sbuf element
	if(p == NULL) {
		//free(tmp_buf);
		pthread_spin_unlock(&(nf_link[fd].sbuf.lock));
		return;
	}
	// printf("compose_packet() on fd %d\n", fd);
	while (p) {
		i = p->tfd;
		// here, we use deficit round robin to craft the sending buffer
		pthread_spin_lock(&(nf_link[i].sbuf.lock));
		//if (rbuf_getlength(&(nf_link[i].sbuf)) > 0) {
		if (nf_link[i].sbuf.length > 0) {
			//printf("compose_packet() we have something from fd %d\n", i);
			//pthread_mutex_lock(&(nf_link[i].sbuf.lock));
			// Tao: how much we have to send
			int tmp_len = nf_link[i].sbuf.length;
			int act_sndbytes = 0;
			if (tmp_len > 0) {
				int remain_spare = buf_len - nwritten - INHLEN;
				act_sndbytes =
						tmp_len < remain_spare ? tmp_len : remain_spare;
				// means we can copy "act_sndbytes" to the sndbuffer
			}
			if(act_sndbytes <= 0) {
				pthread_spin_unlock(&nf_link[fd].sbuf.lock);
				return;
			}
			// craft the response part

			char *wd_buf = tmp_buf + nwritten;
			INNER_H *inh_p = (INNER_H *)wd_buf;
			rbuf_read_nolock(&nf_link[i].sbuf, wd_buf + INHLEN, act_sndbytes);

			pthread_spin_unlock(&(nf_link[i].sbuf.lock));
			// craft the INNER_H part
			inh_p->type = S_RESPONSE;
			inh_p->src_ip = nf_link[i].src_ip;
			inh_p->port = htons(nf_link[i].port);
			inh_p->num_msg = htons((uint16_t)1);
			inh_p->len_msg = htons((uint16_t)act_sndbytes);
			//inh_p->ack = htonl(nf_link[i].ack);
			// update the stats
			//memcpy(tmp_buf + nwritten, wd_buf, INHLEN + act_sndbytes);
			num_msg++;
			len_msg += INHLEN + act_sndbytes;
			nwritten += INHLEN + act_sndbytes;
		}
		else pthread_spin_unlock(&(nf_link[i].sbuf.lock));
		p = p->next;
		if (p == nf_link[fd].scur)
			break;
		else if (buf_len <= (nwritten + INHLEN)) {
			nf_link[fd].scur = p;
			break;
		}
	} // finish while(p)
	// craft the OUTTER_H part
	if (num_msg == 0) {
		//free(tmp_buf);
		pthread_spin_unlock(&(nf_link[fd].sbuf.lock));
		return;
	}

	// const char *ctmp_buf = tmp_buf;
	// printf("compose_packet() we have sth. to write? nwritten: %d\n", nwritten);
	rbuf_write_nolock(&nf_link[fd].sbuf, tmp_buf, nwritten);
	//free(tmp_buf);
	pthread_spin_unlock(&(nf_link[fd].sbuf.lock));
}

// wrapper of funcs

int pm_epoll_wait(int epfd, struct epoll_event *events, 
		int maxevents, int timeout)
{
	int res = 0;
	int flag = 0;	// if exceed the maxevents, terminate
	struct linked_fd *p = ep_link[epfd].head;
	// printf("pm_epoll_wait(): epfd %d \n", epfd);
	struct timeval tvs;
	struct timespec waittime;
	gettimeofday(&tvs, NULL);
	tvs.tv_usec += timeout;
	if (tvs.tv_usec > 1000000) {
		tvs.tv_sec += 1;
		tvs.tv_usec -= 1000000;
	}
	waittime.tv_sec = tvs.tv_sec;
	waittime.tv_nsec = tvs.tv_usec * 1000;
	if (pthread_mutex_trylock(&ep_link[epfd].lock) == 0) {
		pthread_cond_timedwait(&ep_link[epfd].sig, &ep_link[epfd].lock, &waittime);
		pthread_mutex_unlock(&ep_link[epfd].lock);
	}
	while (p) {
		int fd = p->tfd;
		//

		if (fd <= 0 || fd > MAXFD) {
			p = p->next;
			continue;
		}
		//if (fd == 4515)
			//if (rbuf_getlength(&(nf_link[fd].sbuf)) > 0)
				 //printf("pm_epoll_wait()!!!!!!!!!!!!!!!!!\n");

		if (fd <= 0) printf("in pm_epoll_wait(), fd: %d, epfd: %d\n", fd, epfd);
		if (nf_link[fd].is_active == 0) {
//			printf("pm_epoll_wait(): fd %d\tactive %d\n", fd, nf_link[fd].is_active);
			p = p->next;
			continue;
		}

		if (flag)
			break;

		if (nf_link[fd].state == CONN_LISTENING) {
			pthread_mutex_lock(&que_lock);
			int qlen = que_len, qed = que_ed;
			pthread_mutex_unlock(&que_lock);
			while (qlen--) {
				events[res].events = EPOLLIN;
				events[res].data.fd = conn_que[qed].lk_fd;
				res++;
				qed = (qed + 1) % MAXFD;

				if (res >= maxevents) {
					flag = 1;
				}

				if (flag)
					break;
			}
		}
		else if (nf_link[fd].state == CONN_DATA) {
			int tr_flag = 0;
			if (rbuf_getlength(&(nf_link[fd].rbuf)) > 0)
				tr_flag = tr_flag | EPOLLIN;
			if (rbuf_getsparelen(&(nf_link[fd].sbuf)) > 0)
				tr_flag = tr_flag | EPOLLOUT;

			tr_flag = tr_flag & (nf_link[fd].tr_state);
			if (tr_flag != 0) {
				events[res].events = tr_flag;
				events[res].data.fd = fd;
				res++;
			}

			if (res >= maxevents) {
				flag = 1;
			}

			if (flag)
				break;
		}
		else if (nf_link[fd].state == CONN_PIPE) {
			int tr_flag = 0;
			if (rbuf_getlength(&(nf_link[fd].rbuf)) > 0) {
				tr_flag = tr_flag | EPOLLIN;
			}
			if (rbuf_getsparelen(&(nf_link[fd].sbuf)) > 0)
				tr_flag = tr_flag | EPOLLOUT;

			tr_flag = tr_flag & (nf_link[fd].tr_state);
			if (tr_flag != 0) {
				events[res].events = tr_flag;
				events[res].data.fd = fd;
				res++;
			}

			if (res >= maxevents) {
				flag = 1;
			}

			if (flag)
				break;
		}
		else {
			printf("pm_epoll_wait(): error something on nf_link state\n");
		}

		p = p->next;
	}
	return res;
}

int pm_socket(int domain, int type, int protocol)
{
	int fd = find_fake_fd();
	initial_data(fd, CONN_LISTENING, 0, 0, 0);
	mc_main_fd = fd;
	return fd;
}

int pm_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
{
	int ret = -1;
	pthread_mutex_lock(&que_lock);
	if (que_len != 0) {
		ret = conn_que[que_ed].fd;
		que_ed = (que_ed + 1) % MAXFD;
		que_len--;
	}
	pthread_mutex_unlock(&que_lock);
	printf("pm_accpet() with ret: %d\n", ret);
	return ret;
}

int pm_epoll_create(int size) {
	int epfd = epoll_create(size);
	if (epfd > 0) {
		pthread_mutex_init(&ep_link[epfd].lock, NULL);
		pthread_cond_init(&ep_link[epfd].sig, NULL);
	}
	return epfd;
}

int pm_epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev)
{
	// epoll_ctl() system call
	// return: 0 --> success ; -1 --> error
	struct linked_fd *p, *q;
	//printf("ADD[%d], DEL[%d], MOD[%d], op: %d\n", EPOLL_CTL_ADD, EPOLL_CTL_DEL, EPOLL_CTL_MOD, op);
	if (op == EPOLL_CTL_ADD) {
		// add to the ep_link[epfd]
		struct linked_fd *tmp = (struct linked_fd *)malloc(sizeof(struct linked_fd));
		if (tmp != NULL) {
			tmp->tfd = fd;
			tmp->next = NULL;
		}
		else {
			printf("pm_epoll_ctl(): can not malloc\n");
			return -1;
		}

		tmp->next = ep_link[epfd].head;
		ep_link[epfd].head = tmp;

		// change the tr_state
		nf_link[fd].tr_state = op;

		// change the is_active state
		nf_link[fd].is_active = 1;

		if (nf_link[fd].epfd != epfd)
			nf_link[fd].epfd = epfd;

		//printf("pm_epoll_ctl(): add fd %d to epfd %d\n", fd, epfd);
		if (pthread_mutex_trylock(&ep_link[epfd].lock) == 0) {
			pthread_cond_signal(&ep_link[epfd].sig);
			pthread_mutex_unlock(&ep_link[epfd].lock);
		}
	}
	else if (op == EPOLL_CTL_MOD) {
		// just change the tr_state
		nf_link[fd].tr_state = op;
		// change the is_active state
		nf_link[fd].is_active = 1;
		if (nf_link[fd].epfd != epfd)
			nf_link[fd].epfd = epfd;
		//printf("pm_epoll_ctl(): mod fd %d to epfd %d\n", fd, epfd);
		if (pthread_mutex_trylock(&ep_link[epfd].lock) == 0) {
			pthread_cond_signal(&ep_link[epfd].sig);
			pthread_mutex_unlock(&ep_link[epfd].lock);
		}
	}
	else if (op == EPOLL_CTL_DEL) {
		// del from the ep_link[epfd]
		int dflag = 0;
		p = ep_link[epfd].head;
		while (p) {
			if (p->tfd == fd) {
				dflag = 1;
				break;
			}
			q = p;
			p = p->next;
		}
		if (dflag == 1) {
			if (p == ep_link[epfd].head)
				ep_link[epfd].head = p->next;
			else
				q->next = p->next;
			free(p);
		}

		// change the tr_state
		nf_link[fd].tr_state = 0;
		// change the is_active state
		nf_link[fd].is_active = 0;
		//printf("pm_epoll_ctl(): del fd %d to epfd %d\n", fd, epfd);
	}
	else {
		// Tao: this may never happen
		return -1;
	}
	//if (epfd == 8) pm_epoll_ctl_print(epfd);
	return 0;
}

ssize_t pm_write(int fd, const char *buf, size_t count)
{
	int tfd = fd;
	if (nf_link[fd].state == CONN_PIPE) {
		tfd = pipe_pair[fd];
		//printf("########## pm_write PIPE tfd-type [%d], fd-type [%x]\n", nf_link[tfd].state);
		ssize_t avai_len = (ssize_t)rbuf_getsparelen(&nf_link[tfd].rbuf);
		// printf("pm_write(): %f\n", avai_len);
		if (avai_len<0)
		{
			// Tao: there is no error, just no space, we return 0
			return 0;
		}
		if (avai_len<count) {
			rbuf_write(&(nf_link[tfd].rbuf), buf, avai_len);
			// printf("pm_write(): done with %d\n", rbuf_getlength(&(nf_link[tfd].sbuf)));
			return avai_len;
		}
		else {
			rbuf_write(&(nf_link[tfd].rbuf), buf, count);
			// printf("pm_write(): done with %d\n", rbuf_getlength(&(nf_link[tfd].sbuf)));
			return count;
		}
	}
	// printf("pm_write(): on fd %d, why?(%d)\n", tfd, nf_link[fd].state);
	// Tao: the fd is the fake fd
	ssize_t avai_len = (ssize_t)rbuf_getsparelen(&nf_link[tfd].sbuf);
	// printf("pm_write(): %f\n", avai_len);
	if (avai_len<0)
	{
		// Tao: there is no error, just no space, we return 0
		return 0;
	}
	if (avai_len<count) {
		rbuf_write(&(nf_link[tfd].sbuf), buf, avai_len);
		// printf("pm_write(): done with %d\n", rbuf_getlength(&(nf_link[tfd].sbuf)));
		return avai_len;
	}
	else {
		rbuf_write(&(nf_link[tfd].sbuf), buf, count);
		// printf("pm_write(): done with %d\n", rbuf_getlength(&(nf_link[tfd].sbuf)));
		return count;
	}

}

ssize_t pm_read(int fd, char *buf, size_t count)
{
	ssize_t avai_len = (ssize_t)rbuf_getlength(&nf_link[fd].rbuf);
	if (avai_len < 0)
	{
		// Tao: no data and error, just return 0
		return 0;
	}
	int len = avai_len < count ? avai_len : count;
	rbuf_read(&(nf_link[fd].rbuf), buf, len);
	return len;
}

ssize_t pm_sendmsg(int fd, const struct msghdr *msg, int flags)
{
	// Tao: we may lose some control flags here
	return pm_writev(fd, msg->msg_iov, msg->msg_iovlen);
}


ssize_t pm_writev(int fd, const struct iovec *iov, int iovcnt)
{
	ssize_t avai_len = (ssize_t)rbuf_getsparelen(&nf_link[fd].sbuf);
	ssize_t wr_cnt = 0;
	if (avai_len < 0)
	{
		// Tao: no error and data, just return 0
		return 0;
	}
	int i;
	for (i = 0; i<iovcnt; i++) {
		if (avai_len <= 0) {
			break;
		}
		ssize_t len =
				avai_len < ((ssize_t)iov[i].iov_len) ? avai_len : ((ssize_t)iov[i].iov_len);

		rbuf_write(&(nf_link[fd].sbuf), iov[i].iov_base, len);

		wr_cnt = wr_cnt + len;
		avai_len = avai_len - len;
	}
	if (wr_cnt > 0 && nf_link[fd].state == CONN_DATA) {
		if (pthread_mutex_trylock(&ep_link[nf_link[fd].epfd].lock) == 0) {
			pthread_cond_signal(&ep_link[nf_link[fd].epfd].sig);
			pthread_mutex_unlock(&ep_link[nf_link[fd].epfd].lock);
		}
	}
	return wr_cnt;
}

ssize_t pm_readv(int fd, const struct iovec *iov, int iovcnt)
{
	int i, j;
	ssize_t len = (ssize_t)rbuf_getlength(&nf_link[fd].rbuf);
	ssize_t rd_cnt = 0;
	for (i = 0; i < iovcnt; i++) {
		if (len <= 0) {
			break;
		}
		ssize_t act_len = len < ((ssize_t)iov[i].iov_len) ? len : ((ssize_t)iov[i].iov_len);
		rbuf_read(&(nf_link[fd].rbuf), iov[i].iov_base, act_len);

		len = len - act_len;
		rd_cnt = rd_cnt + act_len;
	}
	return rd_cnt;
}

int pm_fcntl(int fd, int cmd, long arg)
{
	return 0;
	// return fcntl(fd, cmd, arg);
}

int pm_bind(int sockfd, struct sockaddr *addr, socklen_t addrlen)
{
	// return bind(sockfd, addr, addrlen);
	return 0;
}

int pm_listen(int sockfd, int backlog)
{
	// return listen(sockfd, backlog);
	return 0;
}

int pm_getsockopt(int sockfd, int level, int optname, void *optval, socklen_t *optlen)
{
	return 0;
	// return getsockopt(sockfd, level, optname, optval, optlen);
}

int pm_setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen)
{
	return 0;
	// return setsockopt(sockfd, level, optname, optval, optlen);
}

int pm_pipe(int pipefd[2])
{
	// int ret = pipe(pipefd);	// Tao: we may change this to fake fds
	pipefd[0] = find_pipe_fd();
	pipefd[1] = find_pipe_fd();
	printf("pm_pipe(): created pipe fd %d %d\n", pipefd[0], pipefd[1]);
	pipe_pair[pipefd[0]] = pipefd[1];
	pipe_pair[pipefd[1]] = pipefd[0];
	initial_data(pipefd[0], CONN_PIPE, 0, 0, 0);
	initial_data(pipefd[1], CONN_PIPE, 0, 0, 0);
	return 0;
}

void initial_data(int tfd, int type, uint8_t ip, uint16_t port, int fd)
{
	nf_link[tfd].nf_fd = tfd;
	// HEADER related
	//nf_link[tfd].ack = 0;
	nf_link[tfd].src_ip = ip;
	nf_link[tfd].port = port;
	nf_link[tfd].state = type;
	nf_link[tfd].is_active = 0;
	(nf_link[tfd].rbuf).buf = (char *)malloc(MAXLEN * sizeof(char));
	nf_link[tfd].rbuf.ind_st = nf_link[tfd].rbuf.ind_ed = nf_link[tfd].rbuf.length = 0;
	nf_link[tfd].rbuf.capacity = MAXLEN;
	pthread_spin_init(&(nf_link[tfd].rbuf.lock), NULL);
	(nf_link[tfd].sbuf).buf = (char *)malloc(MAXLEN * sizeof(char));
	nf_link[tfd].sbuf.ind_st = nf_link[tfd].sbuf.ind_ed = nf_link[tfd].sbuf.length = 0;
	nf_link[tfd].sbuf.capacity = MAXLEN;
	pthread_spin_init(&(nf_link[tfd].sbuf.lock), NULL);

	nf_link[tfd].head = NULL;

	nf_link[tfd].tr_state = 0;

	vis_fd[tfd] = 1;
	if (type == CONN_DATA) {
		struct linked_fd *tmp = (struct linked_fd *)malloc(sizeof(struct linked_fd));
		if (tmp == NULL) {
			printf("initial_data(): error malloc\n");
			return ;
		}
		tmp->tfd = tfd;
		if (nf_link[fd].head == NULL) {
			tmp->next = tmp; // tail->next = tmp
			nf_link[fd].tail = tmp;
			nf_link[fd].scur = tmp;
		}
		else {
			tmp->next = nf_link[fd].head;
			nf_link[fd].tail->next = tmp;
		}
		nf_link[fd].head = tmp;
	}
}


// ring buf ops
inline int rbuf_getlength(RINGBUF *rbuf)
{
	pthread_spin_lock(&(rbuf->lock));
	int ret = rbuf->length;
	pthread_spin_unlock(&(rbuf->lock));
	return ret;
}

inline int rbuf_getsparelen(RINGBUF *rbuf)
{
	pthread_spin_lock(&(rbuf->lock));
	int ret = rbuf->capacity - rbuf->length;
	pthread_spin_unlock(&(rbuf->lock));
	return ret;
}

inline void rbuf_write(RINGBUF *rbuf, const char *buf, int size)
{
	pthread_spin_lock(&(rbuf->lock));
	if (rbuf->ind_ed + size > rbuf->capacity) {
		int len_step1 = rbuf->capacity - rbuf->ind_ed;
		int len_step2 = size-len_step1;
		memcpy(rbuf->buf + rbuf->ind_ed, buf, len_step1);
		memcpy(rbuf->buf, buf + len_step1, len_step2);

		rbuf->ind_ed = len_step2;
	}
	else {
		memcpy(rbuf->buf + rbuf->ind_ed, buf, size);
		rbuf->ind_ed += size;

		if (rbuf->ind_ed >= rbuf->capacity) {
			rbuf->ind_ed = (rbuf->ind_ed) % (rbuf->capacity);
		}
	}

	rbuf->length = rbuf->length + size;

	pthread_spin_unlock(&(rbuf->lock));
}

inline void rbuf_write_nolock(RINGBUF *rbuf, const char *buf, int size)
{
	if (rbuf->ind_ed + size > rbuf->capacity) {
		int len_step1 = rbuf->capacity - rbuf->ind_ed;
		int len_step2 = size - len_step1;
		memcpy(rbuf->buf + rbuf->ind_ed, buf, len_step1);
		memcpy(rbuf->buf, buf + len_step1, len_step2);

		rbuf->ind_ed = len_step2;
	}
	else {
		memcpy(rbuf->buf + rbuf->ind_ed, buf, size);
		rbuf->ind_ed += size;

		if (rbuf->ind_ed >= rbuf->capacity) {
			rbuf->ind_ed = (rbuf->ind_ed) % (rbuf->capacity);
		}
	}

	rbuf->length = rbuf->length + size;
}

inline void rbuf_read(RINGBUF *rbuf, char *buf, int size)
{
	pthread_spin_lock(&(rbuf->lock));
	if (rbuf->ind_st + size > rbuf->capacity) {
		int len_step1 = rbuf->capacity - rbuf->ind_st;
		int len_step2 = size - len_step1;
		memcpy(buf, rbuf->buf + rbuf->ind_st, len_step1);
		memcpy(buf + len_step1, rbuf->buf, len_step2);

		rbuf->ind_st = len_step2;
	}
	else {
		memcpy(buf, rbuf->buf + rbuf->ind_st, size);

		rbuf->ind_st += size;
		if (rbuf->ind_st >= rbuf->capacity) {
			rbuf->ind_st = (rbuf->ind_st) % (rbuf->capacity);
		}
	}

	rbuf->length = rbuf->length - size;

	pthread_spin_unlock(&(rbuf->lock));
}

inline void rbuf_read_nolock(RINGBUF *rbuf, char *buf, int size)
{
	if (rbuf->ind_st + size > rbuf->capacity) {
		int len_step1 = rbuf->capacity - rbuf->ind_st;
		int len_step2 = size - len_step1;
		memcpy(buf, rbuf->buf + rbuf->ind_st, len_step1);
		memcpy(buf + len_step1, rbuf->buf, len_step2);

		rbuf->ind_st = len_step2;
	}
	else {
		memcpy(buf, rbuf->buf + rbuf->ind_st, size);

		rbuf->ind_st += size;
		if (rbuf->ind_st >= rbuf->capacity) {
			rbuf->ind_st = (rbuf->ind_st) % (rbuf->capacity);
		}
	}

	rbuf->length = rbuf->length - size;
}

inline void rbuf_get(RINGBUF *rbuf, char *buf, int size)
{
	pthread_spin_lock(&(rbuf->lock));
	if (rbuf->ind_st + size > rbuf->capacity) {
		int len_step1 = rbuf->capacity - rbuf->ind_st;
		int len_step2 = size - len_step1;
		memcpy(buf, rbuf->buf + rbuf->ind_st, len_step1);
		memcpy(buf + len_step1, rbuf->buf, len_step2);
	}
	else {
		memcpy(buf, rbuf->buf + rbuf->ind_st, size);
	}

	pthread_spin_unlock(&(rbuf->lock));
}

inline void rbuf_release(RINGBUF *rbuf, int len)
{
	pthread_spin_lock(&(rbuf->lock));
	rbuf->ind_st += len;
	if (rbuf->ind_st >= rbuf->capacity) {
		rbuf->ind_st -= rbuf->capacity;
	}
	rbuf->length = rbuf->length - len;
	pthread_spin_unlock(&(rbuf->lock));
}

int find_pipe_fd()
{
	int fakefd;
	pthread_mutex_lock(&vis_fd_lock);
	do {
		fakefd = rand() % (MAXFD - 5000) + 5000;
	} while(vis_fd[fakefd] == 1);
	vis_fd[fakefd] = 1;
	pthread_mutex_unlock(&vis_fd_lock);
	return fakefd;
}

int find_fake_fd()
{
	int fakefd;
	pthread_mutex_lock(&vis_fd_lock);
	do {
		fakefd = rand() % (5000);
	} while(vis_fd[fakefd] == 1);
	vis_fd[fakefd] = 1;
	pthread_mutex_unlock(&vis_fd_lock);
	return fakefd;
}

int epfd_delete(int epfd, int fd)
{
	int ret = 0;
	struct linked_fd *p, *q;

	p = ep_link[epfd].head;
	while (p) {
		if (p->tfd == fd) {
			ret = 1;
			break;
		}
		q = p;
		p = p->next;
	}
	if (ret == 1) {
		if (p == ep_link[epfd].head)
			ep_link[epfd].head = p->next;
		else
			q->next = p->next;
		free(p);
	}
	return ret;
}

// hashmap ops

static unsigned long isPrime(unsigned long val) {
  int i, p, exp, a;

  for (i = 9; i--;) {
	a = (rand() % (val-4)) + 2;
	p = 1;
	exp = val-1;
	while (exp) {
	  if (exp & 1)
		p = (p*a)%val;
	  a = (a*a)%val;
	  exp >>= 1;
	}
	if (p != 1)
	  return 0;
  }
  return 1;
}

static int findPrimeGreaterThan(int val) {
  if (val & 1)
	val+=2;
  else
	val++;
  while (!isPrime(val))
	val+=2;
  return val;
}

static void rehash(HashMap* hm) {
  long size = hm->size;
  hEntry* table = hm->table;
  hm->size = findPrimeGreaterThan(size<<1);
  hm->table = (hEntry*)calloc(sizeof(hEntry), hm->size);
  hm->count = 0;
  while(--size >= 0)
	if (table[size].flags == HASHMAP_ACTIVE)
	  hashmapInsert(hm, table[size].data, table[size].key);

  free(table);
}

HashMap* hashmapCreate(int startsize) {
  HashMap* hm = (HashMap*)malloc(sizeof(HashMap));

  if (!startsize)
	startsize = 1021; // must be 1021
  else
	startsize = findPrimeGreaterThan(startsize-2);

  hm->table = (hEntry*)calloc(sizeof(hEntry), startsize);
  hm->size = startsize;
  hm->count = 0;
  return hm;
}

void hashmapInsert(HashMap* hash, const int data, unsigned long key) {
  long index, i, step;

  if (hash->size <= hash->count)
	rehash(hash);

  do {
	index = key % hash->size;
	step = (key % (hash->size-2)) + 1;

	for (i = 0; i < hash->size; i++) {
	  if (hash->table[index].flags & HASHMAP_ACTIVE) {
		if (hash->table[index].key == key) {
		  hash->table[index].data = data;
		  return;
		}
	  }
	  else {
		hash->table[index].flags |= HASHMAP_ACTIVE;
		hash->table[index].data = data;
		hash->table[index].key = key;
		++hash->count;
		return;
	  }
	  index = (index + step) % hash->size;
	} // finish for (i = 0; i < hash->size; i++)

	/* it should not be possible that we EVER come this far, but unfortunately
	   not every generated prime number is prime (Carmichael numbers...) */
	rehash(hash);
  } // here is do-while's do-loop
  while (1);
}

int hashmapRemove(HashMap* hash, unsigned long key) { // return tfd
  long index, i, step;

  index = key % hash->size;
  step = (key % (hash->size-2)) + 1;

  for (i = 0; i < hash->size; i++) {
	if (hash->table[index].data) {
	  if (hash->table[index].key == key) {
		if (hash->table[index].flags & HASHMAP_ACTIVE) {
		  hash->table[index].flags &= ~HASHMAP_ACTIVE;
		  --hash->count;
		  return hash->table[index].data;
		}
		else /* in, but not active (i.e. deleted) */
		  return 0;
	  }
	}
	else /* found an empty place (can't be in) */
	  return 0;
	index = (index + step) % hash->size;
  }
  /* everything searched through, but not in */
  return 0;
}

int hashmapGet(HashMap* hash, unsigned long key) { // return tfd
  if (hash->count) {
	long index, i, step;
	index = key % hash->size;
	step = (key % (hash->size-2)) + 1;

	for (i = 0; i < hash->size; i++) {
	  if (hash->table[index].key == key) {
		if (hash->table[index].flags & HASHMAP_ACTIVE)
		  return hash->table[index].data;
		break;
	  }
	  else
		if (!hash->table[index].data)
		  break;
	  index = (index + step) % hash->size;
	}
  }
  return 0;
}

